package org.yaac.server.egql.processor;

import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.collect.Lists.newArrayListWithExpectedSize;
import static com.google.common.collect.Lists.newLinkedList;
import static com.google.common.collect.Maps.newHashMap;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;

import org.yaac.server.egql.evaluator.EvaluationResult;
import org.yaac.server.egql.processor.ProcessData.ProcessDataRecord;
import org.yaac.server.util.AutoBeanUtil;
import org.yaac.shared.egql.EGQLConstant;
import org.yaac.shared.egql.Result;
import org.yaac.shared.egql.ResultCell;
import org.yaac.shared.property.PropertyInfo;

import com.google.appengine.api.channel.ChannelMessage;
import com.google.appengine.api.channel.ChannelService;
import com.google.appengine.api.channel.ChannelServiceFactory;
import com.google.appengine.api.datastore.KeyFactory;
import com.google.common.annotations.VisibleForTesting;

/**
 * @author Max Zhu (thebbsky@gmail.com)
 *
 */
public class ChannelMsgSender implements Processor {
	
	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	private static Logger logger = Logger.getLogger(ChannelMsgSender.class.getName());

	/**
	 * remaining max result to send back to client side
	 */
	private Integer maxResult;
	
	/**
	 * 
	 */
	@SuppressWarnings("unused")
	private ChannelMsgSender(){}
	
	/**
	 * @param maxResult
	 */
	public ChannelMsgSender(Integer maxResult) {
		super();
		this.maxResult = maxResult;
	}

	@Override
	public ProcessData process(ProcessContext context, ProcessData input) {
		// step 1 : convert data from evaluator to propertyInfo
		List<List<PropertyInfo>> propertyInfoResult = processProperty(input);
		
		// step 2 : resolve duplicated naming issues
		resolveDuplicate(propertyInfoResult);
		
		// step 3 : convert data from propertyInfo to result cell
		List<List<ResultCell>> resultCellResult = processResult(context, propertyInfoResult);
		
		// step 4 : serialize and send result 
		Result stmtResult = AutoBeanUtil.newResult(KeyFactory.keyToString(context.getPipelineKey()));
		stmtResult.setResult(resultCellResult);
		stmtResult.setStatus(context.getStatus());
		sendMsg(context.getClientId(), AutoBeanUtil.encode(Result.class, stmtResult));
		
		// nothing to return
		return null;
	}

	private void resolveDuplicate(List<List<PropertyInfo>> propertyInfoResult) {
		for (List<PropertyInfo> dataRow : propertyInfoResult) {
			// used to check duplicate name
			Map<String, Integer> duplicateNameCounter = newHashMap();
			
			for (PropertyInfo dataProperty : dataRow) {				
				// check duplicate name
				Integer counter = duplicateNameCounter.get(dataProperty.getTitle());
				if (counter == null) {
					duplicateNameCounter.put(dataProperty.getTitle(), 0);
				} else {
					duplicateNameCounter.put(dataProperty.getTitle(), ++ counter);
					dataProperty.setTitle(dataProperty.getTitle() + "_" + counter);
				}
			}
		}
	}

	private List<List<PropertyInfo>> processProperty(ProcessData input) {
		List<ProcessDataRecord> records = input.getRecords();
		List<List<PropertyInfo>> results = newArrayListWithExpectedSize(records.size());
		
		for (ProcessDataRecord record : records) {
			List<PropertyInfo> resultRow = newLinkedList();
			
			for (EvaluationResult r : record.asIterable()) {
				r.populatePropertyInfo(resultRow);
			}
			
			results.add(resultRow);
		}
		
		return results;
	}
	
	/**
	 * convert from PropertyInfo to ResultCell
	 * 
	 * @param data
	 * @return
	 */
	private List<List<ResultCell>> processResult(ProcessContext context, List<List<PropertyInfo>> data) {
		List<List<ResultCell>> result = new LinkedList<List<ResultCell>>();
		
		for (List<PropertyInfo> dataRow : data) {
			if (!allowMoreResult()) {
				break;
			}
			
			List<ResultCell> resultRow = new ArrayList<ResultCell>(dataRow.size());
			for (PropertyInfo dataProperty : dataRow) {
				ResultCell cell = dataProperty.populateResultCell(AutoBeanUtil.getResultCellFactory());
				resultRow.add(cell);
			}
			result.add(resultRow);
			
			decreaseRemainingResultQuota();
		}
		
		return result;
	}
	
	/**
	 * @param clientId
	 * @param msg
	 */
	private void sendMsg(String clientId, String msg) {
		if (isNullOrEmpty(msg)) {
			logger.info("Empty msg is ignored");
		}
		
		if (msg.length() > EGQLConstant.MAX_RESULT_MSG_SIZE) {
			// split msg and send again
			Result allInOne = AutoBeanUtil.decode(Result.class, msg);
			
			int allInOneSize = allInOne.getResult().size();
			if (allInOneSize == 1) {	// can not send even one result
				// TODO : notify client
				logger.info("message too big even for single record, discarded. Size = " + msg.length());
				return;
			} else {
				logger.info("message too big, splitting into small messages.....");
				
				int idealSize = idealSize(allInOneSize, msg.length()); 
				List<Result> results = splitResult(allInOne, idealSize);
				
				for (Result result : results) {
					sendMsg(clientId, AutoBeanUtil.encode(Result.class, result));
				}
			}
		} else {
			// send msg
			ChannelService channelService = ChannelServiceFactory.getChannelService();
			logger.info("sending msg : client id = " + clientId);
			logger.info("sending msg size = " + msg.length());
			channelService.sendMessage(new ChannelMessage(clientId, msg));	
		}
	}
	
	/**
	 * @param allInOneSize
	 * @param msgLength
	 * @return
	 */
	@VisibleForTesting int idealSize(int allInOneSize, int msgLength) {
		int idealSize = allInOneSize / (msgLength / EGQLConstant.IDEAL_RESULT_MSG_SIZE + 1);
		return idealSize == 0 ? 1 : idealSize; // minimum 1 
	}
	
	private List<Result> splitResult(Result allInOneResult, int splitSize) {
		List<Result> results = new LinkedList<Result>();
		
		Result curr = null;
		
		int idx = 0;
		for (List<ResultCell> row : allInOneResult.getResult()) {
			if ((idx ++) % splitSize == 0) {
				curr = AutoBeanUtil.newResult(allInOneResult.getStatementKey());
				curr.setStatus(allInOneResult.getStatus());
				curr.setTimestamp(allInOneResult.getTimestamp());
				curr.setResult(new ArrayList<List<ResultCell>>(splitSize));
				
				results.add(curr);
			}
			
			curr.getResult().add(row);
		}
		
		return results;
	}
	
	private boolean allowMoreResult() {
		return maxResult >= 0;
	}

	private void decreaseRemainingResultQuota() {
		this.maxResult --;
	}
}
